We've been working on some fun stuff lately, namely dask and anaconda cluster. So we have been experimenting with analyzing all the github data for 2015 on an EC2 cluster (a distributed memory environment). We use anaconda cluster to set up a 50 node cluster on EC2, then use dask for analysis.
Dask is a tool for out-of-core, parallel data analysis. Recently we added a distributed memory scheduler for running dask on clusters. We will also be using dask.bag, which provides an api for operations on unordered lists (like sets but with duplicates). It is great for dealing with semi-structured data like JSON blobs or log files. More blogposts about dask can be found here or here.
Anaconda Cluster lets us easily setup clusters and manage the packages in them with conda. Running the cluster for this demo is just a few lines.
acluster create my-cluster -p aws_profile # create the cluster
acluster install notebook dask-cluster # install plugins that setup an ipython notebook and dask cluster
acluster conda install boto ujson # install conda packages we need for the demo
acluster open notebook # open ipython-notebook in the browser to interact with the cluster
While dask.distributed is well integrated with Anaconda cluster it isn't restricted to it. This blogpost shows how to set up a dask.distributed network manually and these docs show how to set up dask.distributed from any IPyParallel cluster.
Projects for python analytics in a distributed memory environment
We took data from githubarchive.com, from January 2015 to May 2015, and put this on S3. We choose S3 because there are nice python libraries for interacting with it, and we can get awesome bandwidth from EC2 to S3. (The script for gathering this data is here).
Lets inspect the data first so we can find something to analyze and learn the data schema. You can inspect the data yourself in the githubarchive-data
S3 bucket.
In [1]:
### cut me out
import boto
bucket = boto.connect_s3().get_bucket('githubarchive-data') # this is a public bucket
from fnmatch import fnmatchcase
keys = bucket.list()
def data_size(key_str):
"""Check the size of all the keys in our bucket that match pattern
that can contain ? and * for matching.
"""
matches = [k for k in keys if fnmatchcase(k.name, key_str)]
return sum([k.size for k in matches]) * 1e-9 # in GB
data_size('2015-*')
Out[1]:
In [2]:
import dask.bag as db
import ujson as json
# take one file from the bucket load it as a json object, not gz decompression
# happens automatically at compute time.
b = db.from_s3('githubarchive-data', '2015-01-01-0.json.gz').map(json.loads)
In [3]:
first = b.take(1)[0] # take the first json object from the file
first
Out[3]:
In [4]:
first.keys() # top level keys in this json object
Out[4]:
Type looks interesting. What are possible types and how often does each occur? We can inspect this with dask.bag.frequencies
.
In [5]:
%time b.pluck('type').frequencies().compute()
Out[5]:
So most events are pushes, that is not surprising. Lets ask "Who pushes the most?".
We do this by filtering out PushEvent
s. Then we count the frequencies of usernames for the pushes. Then take the top 5.
In [6]:
pushes = b.filter(lambda x: x['type'] == 'PushEvent') # filter out the push events
names = pushes.pluck('actor').pluck('login') # get the login names
top_5 = names.frequencies().topk(5, key=lambda (name, count): count) # List top 5 pushers
%time top_5.compute() # run the above computations
Out[6]:
These users pushed the most, but push can have multiple commits. So we can ask "who pushed the most commits?".
We can figure this out by grouping by username, then summing the number of commits from every push, for each user. More technically speaking, we want to GroupBy
on usernames, so for each username we get a list their of PushEvents. Then reduce each PushEvent
by taking a count
of their commits. Then reducing these count
s by sum
ing them for each user. So we are grouping then reducing.
However there are algorithms for grouping and reducing simultaneously which avoid expensive shuffle operations and are much faster. In dask bag we have foldby
. Analogous methods: toolz.reduceby
, and in pyspark RDD.combineByKey
.
In [7]:
def get_logins(x):
"""The key for foldby, like a groupby key. Get the username from a PushEvent"""
return x['actor']['login']
def binop(total, x):
"""Count the number of commits in a PushEvent"""
return total + len(x['payload']['commits'])
def combine(total1, total2):
"""This combines commit counts from PushEvents"""
return total1 + total2
commits = pushes.foldby(get_logins, binop, initial=0, combine=combine)
top_commits = commits.topk(5, key=lambda (name, count): count)
%time top_commits.compute()
Out[7]:
Recall this dask.Bag
had one file. Now that we know how to get the top committers, we'll gradually load more data, and benchmark the dask.distributed
scheduler against the default dask.multiprocessing
scheduler.
First we setup the distributed scheduler. Then write a benchmarking script, the benchmarking script is omitted but those interested can find both_benchmark
here. Basically it does time the analysis and prints the results nicely.
In [8]:
# dask.distributed setup
import dask
from dask.distributed import Client
dc = Client('tcp://localhost:9000') # client connected to 50 nodes, 2 workers per node.
# pass dc.get to compute functions to use the distributed scheduler.
## cut out the rest of this cell
# make a top5 committers function
import time
from pprint import pprint
def both_benchmark(data_pattern):
bag = db.from_s3('githubarchive-data', data_pattern).map(json.loads)
pushes = bag.filter(lambda x: x['type'] == 'PushEvent')
commits = pushes.foldby(get_logins, binop, initial=0, combine=combine)
top5 = commits.topk(5, lambda x: x[1])
# time the default comptue and the distributed compute
default_start = time.time()
default_result = top5.compute()
default_time = time.time() - default_start
dist_start = time.time()
dist_result = top5.compute(get=dc.get)
dist_time = time.time() - dist_start
# assert we have the same result
assert default_result == dist_result
# size of the computed data
size = data_size(data_pattern)
# general details
print("To compute {0:.4f} GB of data the default scheduler took {1:.2f} seconds, the distributed scheduler took {2:.2f} seconds".format(size, default_time, dist_time))
print("")
# speedup default_time / dist_time
print("Distributed scheduler is \t\t\t\t\t{:.2f} times faster.".format(default_time / dist_time))
# single node bandwidth = size / default_time
print("Default scheduler compute bandwidth: \t\t\t\t{:.2f} MB/s".format(1e3 * size / default_time))
# dist bandwidth = size / dist_time
print("Distributed scheduler compute bandwidth: \t\t\t{:.2f} MB/s".format(1e3 * size / dist_time))
# dist node bandwidth per node = size / (time * node)
print("Compute bandwidth per node with distributed scheduler: \t\t{:.3f} MB/(s node)".format(1e3 * size / (dist_time * 50)))
print('')
print("Analysis results:")
pprint(dist_result)
Lets benchmark a single file first.
In [9]:
both_benchmark('2015-01-01-0.json.gz')
Dask distributed is comparable with the default scheduler, that is not suprising for this small amount of data.
In [10]:
both_benchmark('2015-01-15-*.json.gz')
Already a good speedup.
In [11]:
both_benchmark('2015-01-1?-*.json.gz')
Computing this on one node is possible, but it is annoying to wait so long. So we continue with just the distributed scheduler. distributed_benchmark
can be found here.
In [12]:
### cut me out
def distributed_benchmark(data_pattern):
bag = db.from_s3('githubarchive-data', data_pattern).map(json.loads)
pushes = bag.filter(lambda x: x['type'] == 'PushEvent')
commits = pushes.foldby(get_logins, binop, initial=0, combine=combine)
top5 = commits.topk(5, lambda x: x[1])
dist_start = time.time()
dist_result = top5.compute(get=dc.get)
dist_time = time.time() - dist_start
# size of the computed data
size = data_size(data_pattern)
# general details
print("To compute {0:.4f} GB of data the distributed scheduler took {1:.2f} seconds".format(size, dist_time))
print('')
# dist bandwidth = size / dist_time
print("Distributed scheduler compute bandwidth: \t\t\t{:.2f} MB/s".format(1e3 * size / dist_time))
# dist node bandwidth per node = size / (time * node)
print("Compute bandwidth per node with distributed scheduler: \t\t{:.3f} MB/(s node)".format(1e3 * size / (dist_time * 50)))
print('')
print("Analysis results:")
pprint(dist_result)
In [13]:
distributed_benchmark('2015-01-*.json.gz')
In [14]:
distributed_benchmark('2015-*.json.gz')
This is experimental work. We had the following problems when doing this experiment:
We also have some lingering issues regarding performance: